package config

import (
	"bufio"
	"crypto/rand"
	"crypto/tls"
	"crypto/x509"
	"encoding/hex"
	"fmt"
	"io/ioutil"
	"os"
	"strconv"
	"strings"
	"time"

	"github.com/Shopify/sarama"
	"github.com/pkg/errors"
	log "github.com/sirupsen/logrus"
	"google.golang.org/grpc"
	"google.golang.org/grpc/credentials"
	"gopkg.in/yaml.v2"
)

// App defines Kafka-Pixy application configuration. It mirrors the structure
// of the JSON configuration file.
type App struct {
	// TCP address that gRPC API server should listen on.
	GRPCAddr string `yaml:"grpc_addr"`

	// TCP address that HTTP API server should listen on.
	TCPAddr string `yaml:"tcp_addr"`

	// Unix domain socket address that HTTP API server should listen on.
	// Listening on a unix domain socket is disabled by default.
	UnixAddr string `yaml:"unix_addr"`

	// An arbitrary number of proxies to different Kafka/ZooKeeper clusters can
	// be configured. Each proxy configuration is identified by a cluster name.
	Proxies map[string]*Proxy `yaml:"proxies"`

	// Default cluster is the one to be used in API calls that do not start with
	// prefix `/clusters/<cluster>`. If it is not explicitly provided, then the
	// one mentioned in the `Proxies` section first is assumed.
	DefaultCluster string `yaml:"default_cluster"`

	// TLS is the application TLS configuration
	TLS `yaml:"tls"`

	// Logging config
	Logging []LoggerCfg
}

// LoggerCfg represents a configuration of an individual logger.
type LoggerCfg struct {
	// Name defines a logger to be used. It can be one of: console, syslog, or
	// udplog.
	Name string `json:"name"`

	// Severity indicates the minimum severity a logger will be logging messages at.
	Severity string `json:"severity"`

	// Logger parameters
	Params map[string]string `json:"params"`
}

func (lc *LoggerCfg) Level() log.Level {
	level, err := log.ParseLevel(lc.Severity)
	if err != nil {
		return log.WarnLevel
	}
	return level
}

// Proxy defines configuration of a proxy to a particular Kafka/ZooKeeper
// cluster.
type Proxy struct {
	// Unique ID that identifies a Kafka-Pixy instance in both ZooKeeper and
	// Kafka. It is automatically generated by default and it is recommended to
	// leave it like that.
	ClientID string `yaml:"client_id"`

	Kafka struct {

		// List of seed Kafka peers that Kafka-Pixy should access to resolve
		// the Kafka cluster topology.
		SeedPeers []string `yaml:"seed_peers"`

		// Version of the Kafka cluster. Supported versions are 0.10.2.1 - 2.0.0
		Version KafkaVersion

		// Optionally use TLS when connecting to Kafka. This must be enabled
		// for following options to be used.
		TLSEnabled bool `yaml:"tls"`

		// The path to the CA certificate (PEM)
		CACertFile string `yaml:"ca_certificate_file"`

		// The path to the Client Certificate (PEM)
		ClientCertFile string `yaml:"client_certificate_file"`

		// The path to the Client Key (PEM)
		ClientCertKeyFile string `yaml:"client_key_file"`

		// From the tls package:
		//  InsecureSkipVerify controls whether a client verifies the
		//  server's certificate chain and host name.
		//  If InsecureSkipVerify is true, TLS accepts any certificate
		//  presented by the server and any host name in that certificate.
		//  In this mode, TLS is susceptible to man-in-the-middle attacks.
		//  This should be used only for testing.
		InsecureSkipVerify bool `yaml:"insecure"`
	} `yaml:"kafka"`

	ZooKeeper struct {

		// List of seed ZooKeeper peers that Kafka-Pixy should access to
		// resolve the ZooKeeper cluster topology.
		SeedPeers []string `yaml:"seed_peers"`

		// A root directory in ZooKeeper to store consumers data.
		Chroot string `yaml:"chroot"`

		// ZooKeeper session timeout has to be a minimum of 2 times the
		// tickTime (as set in the server configuration) and a maximum of 20
		// times the tickTime. The default ZooKeeper tickTime is 2 seconds.
		//
		// See http://zookeeper.apache.org/doc/trunk/zookeeperProgrammers.html#ch_zkSessions
		SessionTimeout time.Duration `yaml:"session_timeout"`
	} `yaml:"zoo_keeper"`

	// Networking timeouts. These all pass through to sarama's `config.Net`
	// field.
	Net struct {
		// How long to wait for the initial connection.
		DialTimeout time.Duration `yaml:"dial_timeout"`

		// How long to wait for a response.
		ReadTimeout time.Duration `yaml:"read_timeout"`

		// How long to wait for a transmit.
		WriteTimeout time.Duration `yaml:"write_timeout"`
	} `yaml:"net"`

	Producer struct {

		// Size of all buffered channels created by the producer module.
		ChannelBufferSize int `yaml:"channel_buffer_size"`

		// Size of maximum message in bytes
		MaxMessageBytes int `yaml:"max_message_bytes"`

		// The type of compression to use on messages.
		Compression Compression `yaml:"compression"`

		// The best-effort number of bytes needed to trigger a flush.
		FlushBytes int `yaml:"flush_bytes"`

		// The best-effort frequency of flushes.
		FlushFrequency time.Duration `yaml:"flush_frequency"`

		// How long to wait for the cluster to settle between retries.
		RetryBackoff time.Duration `yaml:"retry_backoff"`

		// The total number of times to retry sending a message.
		RetryMax int `yaml:"retry_max"`

		// The level of acknowledgement reliability needed from the broker.
		RequiredAcks RequiredAcks `yaml:"required_acks"`

		// Period of time that Kafka-Pixy should keep trying to submit buffered
		// messages to Kafka. It is recommended to make it large enough to survive
		// a ZooKeeper leader election in your setup.
		ShutdownTimeout time.Duration `yaml:"shutdown_timeout"`

		// How to assign incoming messages to a Kafka partition. Defaults to
		// using a hash of the specified message key, or random if the key is
		// unspecified.
		Partitioner PartitionerConstructor `yaml:"partitioner"`

		// The timeout to specify on individual produce requests to the broker.
		// The broker will wait for replication to complete up to this duration
		// before returning an error.
		Timeout time.Duration `yaml:"timeout"`
	} `yaml:"producer"`

	Consumer struct {
		// If set, Kafka-Pixy will not configure a consumer, and any attempts to
		// call the consumer APIs will return an error.
		Disabled bool `yaml:"disabled"`

		// Period of time that Kafka-Pixy should wait for an acknowledgement
		// before retrying.
		AckTimeout time.Duration `yaml:"ack_timeout"`

		// Size of all buffered channels created by the consumer module.
		ChannelBufferSize int `yaml:"channel_buffer_size"`

		// The number of bytes of messages to attempt to fetch for each
		// topic-partition in each fetch request. These bytes will be read into
		// memory for each partition, so this helps control the memory used by
		// the consumer. The fetch request size must be at least as large as
		// the maximum message size the server allows or else it is possible
		// for the producer to send messages larger than the consumer can fetch.
		FetchMaxBytes int `yaml:"fetch_max_bytes"`

		// The maximum amount of time the server will block before answering
		// the fetch request if there isn't data immediately available.
		FetchMaxWait time.Duration `yaml:"fetch_max_wait"`

		// Consume request will wait at most this long for a message from a
		// topic to become available before expiring.
		LongPollingTimeout time.Duration `yaml:"long_polling_timeout"`

		// The maximum number of unacknowledged messages allowed for a
		// particular group-topic-partition at a time. When this number is
		// reached subsequent consume requests will return long polling timeout
		// errors, until some of the pending messages are acknowledged.
		MaxPendingMessages int `yaml:"max_pending_messages"`

		// The maximum number of retries Kafka-Pixy will make to offer an
		// unack message. Messages that exceeded the number of retries are
		// discarded by Kafka-Pixy and acknowledged in Kafka. Zero retries
		// means that messages will be offered just once.
		//
		// If you want Kafka-Pixy to retry indefinitely, then set this
		// parameter to -1.
		MaxRetries int `yaml:"max_retries"`

		// How frequently to commit offsets to Kafka.
		OffsetsCommitInterval time.Duration `yaml:"offsets_commit_interval"`

		// Kafka-Pixy should wait this long after it gets notification that a
		// consumer joined/left a consumer group it is a member of before
		// rebalancing.
		RebalanceDelay time.Duration `yaml:"rebalance_delay"`

		// If a request to a Kafka-Pixy fails for any reason, then it should
		// wait this long before retrying.
		RetryBackoff time.Duration `yaml:"retry_backoff"`

		// Period of time that Kafka-Pixy should keep subscription to
		// a topic by a group in absence of requests from the consumer group.
		SubscriptionTimeout time.Duration `yaml:"subscription_timeout"`
	} `yaml:"consumer"`
}

type KafkaVersion struct {
	v sarama.KafkaVersion
}

func (kv *KafkaVersion) UnmarshalText(text []byte) error {
	str := string(text)
	v, ok := map[string]sarama.KafkaVersion{
		"0.8.2.2":  sarama.V0_8_2_2,
		"0.9.0.0":  sarama.V0_9_0_0,
		"0.9.0.1":  sarama.V0_9_0_1,
		"0.10.0.0": sarama.V0_10_0_0,
		"0.10.0.1": sarama.V0_10_0_1,
		"0.10.1.0": sarama.V0_10_1_0,
		"0.10.2.0": sarama.V0_10_2_0,
		"0.10.2.1": sarama.V0_10_2_1,
		"0.10.2.2": sarama.V0_10_2_1, // sarama does not define V0_10_2_2
		"0.11.0.0": sarama.V0_11_0_0,
		"0.11.0.1": sarama.V0_11_0_1,
		"0.11.0.2": sarama.V0_11_0_2,
		"0.11.0.3": sarama.V0_11_0_2, // sarama does not define V0_11_0_3
		"1.0.0":    sarama.V1_0_0_0,
		"1.0.1":    sarama.V1_0_0_0, // sarama does not define V1_0_1_0
		"1.0.2":    sarama.V1_0_0_0, // sarama does not define V1_0_2_0
		"1.1.0":    sarama.V1_1_0_0,
		"1.1.1":    sarama.V1_1_1_0,
		"2.0.0":    sarama.V2_0_0_0,
		"2.0.1":    sarama.V2_0_1_0,
		"2.1.0":    sarama.V2_1_0_0,
		"2.1.1":    sarama.V2_1_0_0, // sarama does not define V2_1_1_0
		"2.2.0":    sarama.V2_2_0_0,
		"2.2.1":    sarama.V2_2_0_0, // sarama does not define V2_2_1_0
		"2.3.0":    sarama.V2_3_0_0,
	}[str]
	if !ok {
		return errors.Errorf("bad kafka version, %s", str)
	}
	kv.v = v
	return nil
}

func (kv *KafkaVersion) Set(v sarama.KafkaVersion) {
	kv.v = v
}

func (kv *KafkaVersion) IsAtLeast(v sarama.KafkaVersion) bool {
	return kv.v.IsAtLeast(v)
}

type Compression sarama.CompressionCodec

func (c *Compression) UnmarshalText(text []byte) error {
	str := string(text)
	v, ok := map[string]sarama.CompressionCodec{
		"none":   sarama.CompressionNone,
		"gzip":   sarama.CompressionGZIP,
		"snappy": sarama.CompressionSnappy,
		"lz4":    sarama.CompressionLZ4,
	}[str]
	if !ok {
		return errors.Errorf("bad compression, %s", str)
	}
	*c = Compression(v)
	return nil
}

type RequiredAcks sarama.RequiredAcks

func (ra *RequiredAcks) UnmarshalText(text []byte) error {
	str := string(text)
	v, ok := map[string]sarama.RequiredAcks{
		"no_response":    sarama.NoResponse,
		"wait_for_local": sarama.WaitForLocal,
		"wait_for_all":   sarama.WaitForAll,
	}[str]
	if !ok {
		return errors.Errorf("bad compression, %s", str)
	}
	*ra = RequiredAcks(v)
	return nil
}

type PartitionerConstructor string

func (pc PartitionerConstructor) ToPartitionerConstructor() (sarama.PartitionerConstructor, error) {
	v, ok := map[string]sarama.PartitionerConstructor{
		"hash":       sarama.NewHashPartitioner,
		"random":     sarama.NewRandomPartitioner,
		"roundrobin": sarama.NewRoundRobinPartitioner,
	}[string(pc)]
	if !ok {
		return nil, errors.Errorf("bad partitioner: %s", pc)
	}
	return v, nil
}

// SaramaProducerCfg returns a config for sarama producer.
func (p *Proxy) SaramaProducerCfg() *sarama.Config {
	saramaCfg := sarama.NewConfig()
	saramaCfg.ChannelBufferSize = p.Producer.ChannelBufferSize
	saramaCfg.ClientID = p.ClientID
	saramaCfg.Version = p.Kafka.Version.v

	saramaCfg.Net.DialTimeout = p.Net.DialTimeout
	saramaCfg.Net.ReadTimeout = p.Net.ReadTimeout
	saramaCfg.Net.WriteTimeout = p.Net.WriteTimeout

	saramaCfg.Producer.MaxMessageBytes = p.Producer.MaxMessageBytes
	saramaCfg.Producer.Compression = sarama.CompressionCodec(p.Producer.Compression)
	saramaCfg.Producer.Flush.Frequency = p.Producer.FlushFrequency
	saramaCfg.Producer.Flush.Bytes = p.Producer.FlushBytes
	saramaCfg.Producer.Retry.Backoff = p.Producer.RetryBackoff
	saramaCfg.Producer.Retry.Max = p.Producer.RetryMax
	saramaCfg.Producer.RequiredAcks = sarama.RequiredAcks(p.Producer.RequiredAcks)
	saramaCfg.Producer.Partitioner, _ = p.Producer.Partitioner.ToPartitionerConstructor()
	saramaCfg.Producer.Timeout = p.Producer.Timeout

	if p.Kafka.TLSEnabled {
		saramaCfg.Net.TLS.Enable = true
		tlsCfg, _ := p.newTLSConfig() // Ok to ignore err since we validated
		saramaCfg.Net.TLS.Config = tlsCfg
	}

	return saramaCfg
}

func (p *Proxy) SaramaClientCfg() *sarama.Config {
	saramaCfg := sarama.NewConfig()
	saramaCfg.ChannelBufferSize = p.Consumer.ChannelBufferSize
	saramaCfg.ClientID = p.ClientID
	saramaCfg.Version = p.Kafka.Version.v

	saramaCfg.Net.DialTimeout = p.Net.DialTimeout
	saramaCfg.Net.ReadTimeout = p.Net.ReadTimeout
	saramaCfg.Net.WriteTimeout = p.Net.WriteTimeout

	if p.Kafka.TLSEnabled {
		saramaCfg.Net.TLS.Enable = true
		tlsCfg, _ := p.newTLSConfig() // Ok to ignore err since we validated
		saramaCfg.Net.TLS.Config = tlsCfg
	}

	return saramaCfg
}

func (p *Proxy) newTLSConfig() (*tls.Config, error) {
	tlsConfig := &tls.Config{
		InsecureSkipVerify: p.Kafka.InsecureSkipVerify,
	}

	if p.Kafka.CACertFile != "" {
		// build root CA
		roots := x509.NewCertPool()
		caCert, err := ioutil.ReadFile(p.Kafka.CACertFile)
		if err != nil {
			return nil, err
		}
		ok := roots.AppendCertsFromPEM(caCert)
		if !ok {
			return nil, err
		}

		tlsConfig.RootCAs = roots
	}

	if p.Kafka.ClientCertFile != "" && p.Kafka.ClientCertKeyFile != "" {
		// setup client certs
		cert, err := tls.LoadX509KeyPair(p.Kafka.ClientCertFile, p.Kafka.ClientCertKeyFile)
		if err != nil {
			return nil, err
		}
		tlsConfig.Certificates = []tls.Certificate{cert}
	}

	tlsConfig.BuildNameToCertificate()
	return tlsConfig, nil
}

// DefaultApp returns default application configuration where default proxy has
// the specified cluster.
func DefaultApp(cluster string) *App {
	appCfg := newApp()
	proxyCfg := DefaultProxy()
	appCfg.Proxies[cluster] = proxyCfg
	appCfg.DefaultCluster = cluster
	return appCfg
}

// DefaultProxy returns configuration used by default.
func DefaultProxy() *Proxy {
	return defaultProxyWithClientID(newClientID())
}

// FromYAMLFile parses configuration from a YAML file and performs basic
// validation of parameters.
func FromYAMLFile(filename string) (*App, error) {
	configFile, err := os.Open(filename)
	if err != nil {
		return nil, err
	}
	defer configFile.Close()
	data, err := ioutil.ReadAll(configFile)
	if err != nil {
		return nil, err
	}

	appCfg, err := FromYAML(data)
	if err != nil {
		return nil, err
	}
	return appCfg, nil
}

// FromYAML parses configuration from a YAML string and performs basic
// validation of parameters.
func FromYAML(data []byte) (*App, error) {
	appCfg := newApp()
	if err := yaml.Unmarshal(data, appCfg); err != nil {
		return nil, errors.Wrap(err, "failed to parse config")
	}

	// for the sake of dealing with default values (see below),
	// unmarshal this again as a different struct
	var prob proxyProb
	if err := yaml.Unmarshal(data, &prob); err != nil {
		return nil, errors.Wrap(err, "failed to parse configuration")
	}
	clientID := newClientID()
	for _, proxyItem := range prob.Proxies {
		cluster, ok := proxyItem.Key.(string)
		if !ok {
			return nil, errors.Errorf("invalid cluster, %v", cluster)
		}
		// A hack with marshaling and unmarshaled of a Proxy structure is used
		// here to preserve default values. If we try to unmarshal entire App
		// config, then proxy structures are overridden with zero Proxy values.
		encodedProxyCfg, err := yaml.Marshal(proxyItem.Value)
		if err != nil {
			panic(err)
		}
		proxyCfg := defaultProxyWithClientID(clientID)
		if err := yaml.Unmarshal(encodedProxyCfg, proxyCfg); err != nil {
			return nil, errors.Wrapf(err, "failed to parse proxy config, cluster=%s", cluster)
		}
		appCfg.Proxies[cluster] = proxyCfg
		if appCfg.DefaultCluster == "" {
			appCfg.DefaultCluster = cluster
		}
	}

	if err := appCfg.validate(); err != nil {
		return nil, errors.Wrap(err, "invalid config parameter")
	}

	return appCfg, nil
}

func (a *App) validate() error {
	if len(a.Proxies) == 0 {
		return errors.New("at least on proxy must be configured")
	}
	for cluster, proxyCfg := range a.Proxies {
		if err := proxyCfg.validate(); err != nil {
			return errors.Wrapf(err, "invalid config, cluster=%s", cluster)
		}
	}
	return nil
}

func (p *Proxy) validate() error {
	// Validate the Producer parameters.
	switch {
	case p.Producer.ChannelBufferSize <= 0:
		return errors.New("producer.channel_buffer_size must be > 0")
	case p.Producer.FlushBytes < 0:
		return errors.New("producer.flush_bytes must be >= 0")
	case p.Producer.FlushFrequency < 0:
		return errors.New("producer.flush_frequency must be >= 0")
	case p.Producer.RetryBackoff <= 0:
		return errors.New("producer.retry_backoff must be > 0")
	case p.Producer.RetryMax <= 0:
		return errors.New("producer.retry_max must be > 0")
	case p.Producer.ShutdownTimeout < 0:
		return errors.New("producer.shutdown_timeout must be >= 0")
	case p.Producer.Timeout < 0:
		return errors.New("producer.timeout must be >= 0")
	}
	if _, err := p.Producer.Partitioner.ToPartitionerConstructor(); err != nil {
		return fmt.Errorf("producer.partitioner is invalid: %q", err)
	}
	// Validate the Consumer parameters.
	switch {
	case p.Consumer.AckTimeout <= 0:
		return errors.New("consumer.ack_timeout must be > 0")
	case p.Consumer.ChannelBufferSize <= 0:
		return errors.New("consumer.channel_buffer_size must be > 0")
	case p.Consumer.FetchMaxBytes <= 0:
		return errors.New("consumer.fetch_bytes must be > 0")
	case p.Consumer.LongPollingTimeout <= 0:
		return errors.New("consumer.long_polling_timeout must be > 0")
	case p.Consumer.MaxPendingMessages <= 0:
		return errors.New("consumer.max_pending_messages must be > 0")
	case p.Consumer.MaxRetries < -1:
		return errors.New("consumer.max_retries must be >= -1")
	case p.Consumer.OffsetsCommitInterval <= 0:
		return errors.New("consumer.offsets_commit_interval must be > 0")
	case p.Consumer.SubscriptionTimeout <= 0:
		return errors.New("consumer.subscription_timeout must be > 0")
	case p.Consumer.RetryBackoff <= 0:
		return errors.New("consumer.retry_backoff must be > 0")
	}

	// Validate TLS configuration.
	if err := p.validateTLS(); err != nil {
		return fmt.Errorf("invalid tls configuration: %q", err)
	}

	return nil
}

func (p *Proxy) validateTLS() error {
	// Validate the CA certificate
	if p.Kafka.CACertFile != "" {
		caCert, err := ioutil.ReadFile(p.Kafka.CACertFile)
		if err != nil {
			return fmt.Errorf("kafka.ca_cert_file: %s", err)
		}
		roots := x509.NewCertPool()
		ok := roots.AppendCertsFromPEM(caCert)
		if !ok {
			return errors.New("kafka.ca_cert_file does not appear to be a valid CA certificate")
		}
	}

	// Validate the client certificate and key
	if p.Kafka.ClientCertFile != "" && p.Kafka.ClientCertKeyFile != "" {
		_, err := ioutil.ReadFile(p.Kafka.ClientCertFile)
		if err != nil {
			return fmt.Errorf("kafka.client_cert_file: %s", err)
		}

		_, err = ioutil.ReadFile(p.Kafka.ClientCertKeyFile)
		if err != nil {
			return fmt.Errorf("kafka.client_cert_key_file: %s", err)
		}

		_, err = tls.LoadX509KeyPair(p.Kafka.ClientCertFile, p.Kafka.ClientCertKeyFile)
		if err != nil {
			return errors.New("kafka.client_cert_file and kafka.client_cert_key_file are not a valid certificate pair")
		}
	}

	return nil
}

func newApp() *App {
	appCfg := &App{}
	appCfg.GRPCAddr = "0.0.0.0:19091"
	appCfg.TCPAddr = "0.0.0.0:19092"
	appCfg.Proxies = make(map[string]*Proxy)
	return appCfg
}

func defaultProxyWithClientID(clientID string) *Proxy {
	c := &Proxy{}
	c.ClientID = clientID
	c.ZooKeeper.SeedPeers = []string{"localhost:2181"}
	c.ZooKeeper.SessionTimeout = 15 * time.Second

	c.Kafka.SeedPeers = []string{"localhost:9092"}

	c.Kafka.Version.v = sarama.V0_10_2_1
	// If a valid Kafka version provided in an environment variable then use it
	// as the default value. This logic is only needed in tests.
	envKafkaVersion := os.Getenv("KAFKA_VERSION")
	var kv KafkaVersion
	if err := kv.UnmarshalText([]byte(envKafkaVersion)); err == nil {
		c.Kafka.Version = kv
	}

	c.Net.DialTimeout = 30 * time.Second
	c.Net.ReadTimeout = 30 * time.Second
	c.Net.WriteTimeout = 30 * time.Second

	c.Producer.ChannelBufferSize = 4096
	c.Producer.MaxMessageBytes = 1000000
	c.Producer.Compression = Compression(sarama.CompressionSnappy)
	c.Producer.FlushFrequency = 500 * time.Millisecond
	c.Producer.FlushBytes = 1024 * 1024
	c.Producer.RequiredAcks = RequiredAcks(sarama.WaitForAll)
	c.Producer.RetryBackoff = 10 * time.Second
	c.Producer.RetryMax = 6
	c.Producer.ShutdownTimeout = 30 * time.Second
	c.Producer.Partitioner = PartitionerConstructor("hash")
	c.Producer.Timeout = 10 * time.Second

	c.Consumer.AckTimeout = 300 * time.Second
	c.Consumer.ChannelBufferSize = 64
	c.Consumer.FetchMaxBytes = 1024 * 1024
	c.Consumer.FetchMaxWait = 250 * time.Millisecond
	c.Consumer.LongPollingTimeout = 3 * time.Second
	c.Consumer.MaxPendingMessages = 300
	c.Consumer.MaxRetries = -1
	c.Consumer.OffsetsCommitInterval = 500 * time.Millisecond
	c.Consumer.SubscriptionTimeout = 15 * time.Second
	c.Consumer.RetryBackoff = 500 * time.Millisecond
	return c
}

// newClientID creates a unique id that identifies this particular Kafka-Pixy
// in both Kafka and ZooKeeper.
func newClientID() string {
	hostname, err := os.Hostname()
	if err != nil {
		token := make([]byte, 4)
		_, _ = rand.Read(token)
		return "kp_" + hex.EncodeToString(token)
	}
	cid, err := getDockerCID()
	if err != nil {
		return "kp_" + hostname + "_" + strconv.Itoa(os.Getpid())
	}
	return "kp_" + hostname + "_" + cid
}

func getDockerCID() (string, error) {
	f, err := os.Open("/proc/self/cgroup")
	if err != nil {
		return "", errors.Wrap(err, "cannot open /proc/self/cgroup")
	}
	defer f.Close()

	scanner := bufio.NewScanner(f)
	for scanner.Scan() {
		line := scanner.Text()
		parts := strings.Split(line, "/docker/")
		if len(parts) != 2 {
			continue
		}

		fullDockerCID := parts[1]
		return fullDockerCID[:12], nil
	}
	return "", errors.New("cannot find Docker cgroup")
}

type proxyProb struct {
	Proxies yaml.MapSlice
}

type TLS struct {
	CertPath string `yaml:"certificate_path"`
	KeyPath  string `yaml:"key_path"`
}

// GRPCSecurityOpts returns an array (possibly empty) with gRPC security
// configuration if properly configured
func (a *App) GRPCSecurityOpts() ([]grpc.ServerOption, error) {
	srvOpts := []grpc.ServerOption{}
	// use security only if both cert and key paths are set
	if a.TLS.CertPath != "" && a.TLS.KeyPath != "" {
		cert, err := ioutil.ReadFile(a.TLS.CertPath)
		if err != nil {
			return nil, err
		}
		key, err := ioutil.ReadFile(a.TLS.KeyPath)
		if err != nil {
			return nil, err
		}
		signedCert, err := tls.X509KeyPair(cert, key)
		if err != nil {
			return nil, err
		}
		sslCert := credentials.NewServerTLSFromCert(&signedCert)
		srvOpts = append(srvOpts, grpc.Creds(sslCert))
	}
	return srvOpts, nil
}
